iT邦幫忙

2021 iThome 鐵人賽

DAY 12
0
Software Development

從零開始Reactive Programming- Spring系列 第 13

[Day 12] Reactive Programming - Reactor(publishOn/subscribeOn)

  • 分享至 

  • xImage
  •  

前言

在上一篇介紹了Reactor提供Scheduler來幫助開發者,這篇就是來說明具體是如何使用。

publishOn

執行的方式與一般的operator一樣,會影響從publishOn以下的operator chain,改變其threading context,也就是改變執行緒,直到如果有下一個publishOn出現。
官方提供的範例,新增一個parallel-scheduler,在主流程裡面宣告Flux,最後新開一個Threadsubscribe(),這樣在publishOn之前的操作都會是new Thread裡面去執行,之後的則會是一開始宣告的Scheduler裡面。

Scheduler s = Schedulers.newParallel("parallel-scheduler", 2);

final Flux<String> flux = Flux
    .range(1, 2)
    .map(i -> 10 + i)
    .publishOn(s)
    .map(i -> "value " + i);

Thread thread = new Thread(() -> flux.subscribe(System.out::println));
thread.start();
Thread.sleep(100);

另一篇Spring blog 提供的範例我覺得是可以更直覺的了解使用情境,假設需要去呼叫外部的一個阻斷式(blocking)的服務,如果沒有使用publishOn特別指定Scheduler,最後執行都會是在subscribe所處在的執行緒(main)裡面,就會等ABC處理完才會處理DE。

Flux.fromIterable(firstListOfUrls) //contains A, B and C 
    .map(url -> blockingWebClient.get(url)) 
    .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body)); 
Flux.fromIterable(secondListOfUrls) //contains D and E 
    .map(url -> blockingWebClient.get(url)) 
    .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
/*
main from first list, got A 
main from first list, got B 
main from first list, got C 
main from second list, got D 
main from second list, got E
*/

這時候如果加上publishOn,還記得上一篇介紹過最適合用於阻斷式服務的就是boundedElastic,就可以看到結果是穿插的,效能也就相對的更好。

Flux.fromIterable(firstListOfUrls) //contains A, B and C 
    .publishOn(Schedulers.boundedElastic()) 
    .map(url -> blockingWebClient.get(url)) 
    .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body)); 
Flux.fromIterable(secondListOfUrls) //contains D and E 
    .publishOn(Schedulers.boundedElastic()) 
    .map(url -> blockingWebClient.get(url)) 
    .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
/*
boundedElastic-1 from first list, got A 
boundedElastic-2 from second list, got D 
boundedElastic-1 from first list, got B 
boundedElastic-2 from second list, got E 
boundedElastic-1 from first list, got C
*/

subscribeOn

publishOn幾乎一模一樣,會去改變operator chain的threading context,差別在於publishOn只會改變之後的,不會朔及既往,而subscribeOn則是從頭到尾都改變,不論在哪一個位置,直到遇到publishOn為止,也就是如果有publishOn,那之後的operator 仍然會依照publishOn所指定的Scheduler。將上一個範例直接拿來改成使用subscribeOn,那兩個map都會是在一開始宣告的Scheduler內。

Scheduler s = Schedulers.newParallel("parallel-scheduler", 2); 
final Flux<String> flux = Flux 
    .range(1, 2) 
    .map(i -> 10 + i) 
    .subscribeOn(s) 
    .map(i -> "value " + i); 
Thread thread = new Thread(() -> flux.subscribe(System.out::println)); 
thread.start(); 
Thread.sleep(100);

結語

至於為什麼有了publishOn還需要subscribeOn,理論上你只需要將publishOn放在最前面就能夠取代subscribeOn,而且我認為即便是放後面結果一樣,為了可讀性subscribeOn應該也要放在最前面。其實原因就是有些情境當你沒辦法放在最前面的時候,假設有一個api或是function是別人寫好的你無法去更改,如果只有publishOn是無法更改到上一層(upstream),這時候就需要靠subscribeOn來處理。

最後還有一件事情從main thread跳到其他的執行緒可以透過以上的方法,但是從其他執行緒想要再跳回main是不可能的,雖然我也無法理解為何會有這樣的需求。

資料來源


上一篇
[Day 11] Reactive Programming - Reactor(Scheduler)
下一篇
[Day 13] Reactive Programming - Reactor(Processors & Sinks)
系列文
從零開始Reactive Programming- Spring32
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言